[CELEBORN-2341] Preserve partially-committed partitions on CommitFiles timeout#3721
[CELEBORN-2341] Preserve partially-committed partitions on CommitFiles timeout#3721shlomitubul wants to merge 2 commits into
Conversation
1397314 to
9ae10f6
Compare
There was a problem hiding this comment.
Pull request overview
This PR improves the worker-side CommitFiles timeout/cancellation handling in Controller to (a) reply immediately instead of leaving the driver waiting for its ask timeout, (b) stop the COMMIT_FILES_TIME timer on the timeout path, and (c) preserve partially-completed commit state by building a response from the actual committed/empty sets.
Changes:
- Reply immediately and stop the commit-files timer when the async commit future is cancelled/times out.
- Introduce
Controller.buildCommitFilesResponseOnCancelto compute a truthfulCommitFilesResponsefrom the observed commit state (committed/empty vs failed). - Add
ControllerSuiteunit tests covering partial-success and all-failed cancel responses.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala |
Replies on cancel/timeout, stops timer, and adds helper to build cancel/timeout response from commit state. |
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/ControllerSuite.scala |
Adds unit tests for the new cancel/timeout response builder. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
SteNicholas
left a comment
There was a problem hiding this comment.
I traced the full data path on both the driver and reader sides — this is a solid, well-reasoned change. The two unconditional wins (immediate reply, timer-leak fix) are clearly correct, and the riskier partial-success logic is safe.
The safety argument holds (verified)
The crucial claim is that an in-flight partition reported as neither committed nor failed causes silent data loss, so failed = requested − committed − empty is required. Every link checks out:
CommitHandler.checkDataLostkeys only off the failed sets — a partition absent fromfailedPrimaryPartitionIds/failedReplicaPartitionIdsis never flagged.collectResultonly populates the reducer file group fromcommittedPrimaryIds.CelebornShuffleReader(// filter empty partition→.filter(p => fileGroups.partitionGroups.containsKey(p))) silently skips a partition absent from the file group — noFetchFailedException.
So an in-flight partition absent from both sets is silently dropped at read time. Computing failed = requested − committed − empty correctly routes it through checkDataLost → SHUFFLE_DATA_LOST → recompute. PARTIAL_SUCCESS is confirmed terminal/non-retry (same branch as SUCCESS in doParallelCommitFiles), and the "honest limitations" section matches the all-or-nothing behavior in handleFinalCommitFiles.
Concurrency nuance worth a code comment
task.cancel(true) / future.cancel(true) on a CompletableFuture does not interrupt the running task — mayInterruptIfRunning is ignored by CompletableFuture. So when the timer fires, the per-partition commit tasks keep running in commitThreadPool while the error branch snapshots the committed/empty sets. I worked through the race and it's safe: failedPrimaryIds is computed from the live committedPrimaryIds before the committed-list snapshot is taken, and the committed set only grows, so a partition that commits during the window lands in both lists (over-reports failure → safe recompute) — never in "neither". And a partition reaches committedIds only after fileWriter.close() succeeds, so committed ⇒ durable.
This is correct but non-obvious and the ordering is load-bearing. A one-line comment noting the snapshot is best-effort (tasks may still be running since cancel doesn't interrupt) would protect against a future reorder.
Empty-partition edge case (same as Copilot's note)
The COMMIT_FILE_EXCEPTION branch triggers on committed*Ids.isEmpty, which marks genuinely-empty partitions (tracked in emptyFile*Ids) as failed → unnecessary recompute in the rare "all partitions empty, none committed" case. Not a correctness bug — over-reporting failure is always safe, and it byte-for-byte preserves the prior no-commit response. If you'd rather optimize it, gate on committed.isEmpty && empty.isEmpty; otherwise a quick reply on the Copilot thread noting it's an intentional conservative choice would close it out.
Minor
- Tests cover only the pure helper (the risky part — acceptable), but not the empty-only edge case above, which is exactly the Copilot scenario — worth adding.
None of this blocks merge. Nice work, and the writeup/limitations section is appreciated.
🤖 Generated with Claude Code
…s timeout When `celeborn.worker.commitFiles.timeout` fires and `future.cancel(true)` / `task.cancel(true)` interrupt the per-partition commit tasks, the worker's error-path response had three problems that amplified data loss: 1. The response was built with empty committed lists, discarding all partitions that committed before the timer fired. 2. `context.reply()` was never called, so the driver waited out `celeborn.client.rpc.commitFiles.askTimeout` instead of getting the verdict. 3. The `COMMIT_FILES_TIME` timer was never stopped on this path (leak). This builds the response from the actual committed / empty / failed state and returns `PARTIAL_SUCCESS` when any partition committed. Crucially, tasks still queued (or interrupted before reaching a terminal state) when cancellation fires land in NONE of the committed / empty / failed sets, so the failed list is computed as `requested - committed - empty` rather than only the explicitly-failed ids. Otherwise the driver's `CommitHandler.checkDataLost` cannot distinguish an in-flight (has data, uncommitted) partition from an empty (no data) one -- both are absent from committed and failed -- and would silently treat it as empty-and-valid, producing wrong reducer results with no `FetchFailedException`. Reporting them as failed makes the driver recompute them. The response construction is extracted into `Controller.buildCommitFilesResponseOnCancel` and covered by `ControllerSuite`. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
9ae10f6 to
922eba8
Compare
|
Thanks for the thorough trace — addressed both points in the latest push (
Appreciate the review. 🤖 Generated with Claude Code |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
What changes were proposed in this pull request?
This targets
mainand supersedes #3706 (which targetedbranch-0.6).When
celeborn.worker.commitFiles.timeoutfires,Controllercancels the per-partition commit tasks (future.cancel(true)/task.cancel(true)) and runs thehandleAsyncerror branch. Today that branch:context.reply()— the originatingCommitFilesRPC is left unanswered. The driver only learns the outcome after its ask times out (celeborn.client.rpc.commitFiles.askTimeout, which falls back toceleborn.rpc.askTimeout, default60s; often raised to240s+), after which it re-asks with the same commit epoch and the worker's dedup returns the cached response;COMMIT_FILE_EXCEPTIONwith all requested ids failed), discarding partitions that finished closing before the timer fired;COMMIT_FILES_TIMEtimer (latent leak).This PR makes the cancel/timeout branch:
context.reply(response)immediately, so the driver doesn't wait out the ask timeout;workerSource.stopTimer(COMMIT_FILES_TIME, shuffleKey);Controller.buildCommitFilesResponseOnCancel—PARTIAL_SUCCESSwith the partitions that committed when any did, and the unchangedCOMMIT_FILE_EXCEPTIONwhen none did.In the
PARTIAL_SUCCESSbranch the failed lists are computed asrequested − committed − empty, not just the explicitly-failed ids (see correctness note below).Why are the changes needed?
Primary — eliminate the driver stall. On every commit timeout the driver currently blocks for a full
commitFiles.askTimeoutbefore the same-epoch retry retrieves the worker's cached response. Replying immediately removes that wait (and the redundant retry round). ThestopTimercall fixes a timer leak on the same path. These two are unconditional wins on every timeout.Secondary — make the partial-success path safe. Tasks still queued (or interrupted before reaching a terminal state) when cancellation fires land in none of the worker's
committed/empty/failedsets. If such a partition were reported as neither committed nor failed, the driver would record it nowhere;checkDataLostkeys only off the failed sets, so it would not flag it;collectResultwould omit it from the reducer file group; and on the read sideCelebornShuffleReadertreats a partition absent from the file group as empty-and-valid (it filters the requested range topartitionGroups.containsKey(p)), so reducers would silently produce wrong results with noFetchFailedException. Computingfailed = requested − committed − emptyguarantees every not-actually-committed partition is reported failed, routing it throughcheckDataLost→SHUFFLE_DATA_LOST→ stage recompute. This is required to returnPARTIAL_SUCCESSwithout regressing into silent data loss.Scope / honest limitations
Reduce-shuffle finalization is all-or-nothing:
ReducePartitionCommitHandler.handleFinalCommitFilescallscollectResult(which populates the reducer file groups) only whencheckDataLostreturnsfalse, andcheckDataLostflags the whole shuffle if any primary failed (non-replicated) or any partition failed on both replicas. So this PR does not let reducers fetch the committed partitions while only the lost ones recompute — when a timeout leaves any partition uncommitted in a non-replicated shuffle, the whole map stage still recomputes, exactly as before. Preserving committed partitions avoids recompute only when the timeout left nothing actually uncommitted (e.g. the timer fired as the last file closed), and otherwise just keeps the worker's report truthful. The dependable, every-time benefits are the immediate reply and the timer fix.Does this PR introduce any user-facing change?
No protocol or API change.
StatusCode.PARTIAL_SUCCESSis already part ofCommitFilesResponseand is already handled by the driver as a terminal, non-retry status (same branch asSUCCESSinCommitHandler.doParallelCommitFiles).How was this patch tested?
New
ControllerSuiteunit tests forbuildCommitFilesResponseOnCancel:PARTIAL_SUCCESS, committed preserved, the in-flight ids reported failed, the empty id not failed;COMMIT_FILE_EXCEPTIONwith all requested ids failed.Built and
spotless:check-clean onworkerwith Java 17.🤖 Generated with Claude Code